#!/usr/bin/python
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import re
import shutil
from StringIO import StringIO
import tempfile
import unittest

import factory_common  # pylint: disable=W0611
from cros.factory.test import factory
from cros.factory.tools import audit_source_hashes
from cros.factory.utils import file_utils
from cros.factory.utils.process_utils import Spawn


class AuditSourceHashesTest(unittest.TestCase):

  def setUp(self):
    self.tmpdir = tempfile.mkdtemp(prefix='audit_source_hashes_unittest.')

  def tearDown(self):
    shutil.rmtree(self.tmpdir)

  def testBadReport(self):
    """Tests a report file that doesn't contain any events."""
    out = StringIO()
    bad_report = os.path.join(self.tmpdir, 'bad_report.tar.xz')
    file_utils.TouchFile(bad_report)
    self.assertRaisesRegexp(
        SystemExit, '^1$',
        audit_source_hashes.main, [bad_report], out)
    self.assertRegexpMatches(
        out.getvalue(),
        r'(?s).+AuditException: Unable to read events from report.+'
        r'\(tar returned 2\).+'
        r'Found 0 mismatched hashes and 1 exceptions.\n'
        r'FAILED \(0/1 samples passed\).\n$')

  def testThisSourceTree(self):
    """Tests running on the present source tree.

    This is comparing the tree to itself, so it should succeed."""
    out = StringIO()
    self.assertRaisesRegexp(
        SystemExit, '^0$', audit_source_hashes.main,
        [os.path.join(factory.FACTORY_PATH, 'py')], out)
    self.assertEquals('PASSED (1/1 samples passed).\n', out.getvalue())

  def testGooftoolLogSourceHashes(self):
    """'End-to-end' test using 'gooftool log_source_hashes'."""
    # Use tempdir as state root, so we don't end up going through a bunch
    # of old event logs.
    os.environ['CROS_FACTORY_ROOT'] = self.tmpdir

    # Log the source hashes for this source tree.
    Spawn([os.path.join(factory.FACTORY_PATH, 'bin', 'gooftool'),
           'log_source_hashes'], log=True, check_call=True)

    # We should find the event in this log.  Check that it works.
    event_log_path = os.path.join(self.tmpdir, 'state', 'events', 'events')
    out = StringIO()
    self.assertRaisesRegexp(
        SystemExit, '^0$',
        audit_source_hashes.main, [event_log_path], out)

    # Change the hash of this source file in the event log entry.  It
    # should fail now.
    data = file_utils.ReadFile(event_log_path)
    data = re.sub(r'^(\s+tools/audit_source_hashes_unittest\.py: ).+',
                  r'\1deadbeef', data, flags=re.MULTILINE)
    bad_log_path = os.path.join(self.tmpdir, 'events')
    file_utils.WriteFile(bad_log_path, data)

    def AssertMismatch(log_path):
      out = StringIO()
      self.assertRaisesRegexp(SystemExit, '^1$',
                              audit_source_hashes.main, [log_path], out)
      self.assertRegexpMatches(
          out.getvalue(),
          r'In sample .+:\n'
          r'- tools/audit_source_hashes_unittest\.py: hash mismatch '
          r'\(expected .+, found deadbeef\)\n\n'
          r'Found 1 mismatched hashes and 0 exceptions\.\n'
          r'FAILED \(0/1 samples passed\)\.\n')

    # First try with the event log file itself.
    AssertMismatch(bad_log_path)
    # Build a fake report containing the events.  It should fail in the
    # same way.
    report = os.path.join(self.tmpdir, 'report.tar.xz')
    Spawn(['tar', '-acf', report,
           '-C', os.path.join(self.tmpdir), 'events'],
          check_call=True)
    AssertMismatch(report)


class FakeSourceTreeTest(unittest.TestCase):
  """Creates and tests based on fake source trees."""

  def setUp(self):
    self.tmpdir = tempfile.mkdtemp(prefix='audit_source_hashes_unittest.')

    # Create a fake source tree and save the path in self.py.
    self.py = os.path.join(self.tmpdir, 'py')
    os.mkdir(self.py)
    file_utils.WriteFile(os.path.join(self.py, 'a.py'), 'A')
    file_utils.WriteFile(os.path.join(self.py, 'b.py'), 'B')
    file_utils.WriteFile(os.path.join(self.py, 'c.py'), 'C')

    # Replicate the source tree under a 'sample' directory.
    # Save the path in self.py2.
    sample = os.path.join(self.tmpdir, 'sample')
    os.mkdir(sample)
    self.py2 = os.path.join(sample, 'py')
    shutil.copytree(self.py, self.py2)

  def tearDown(self):
    shutil.rmtree(self.tmpdir)

  def _ModifyTree(self):
    """Modifies py2 to differ from py."""
    os.rename(os.path.join(self.py2, 'b.py'), os.path.join(self.py2, 'b2.py'))
    file_utils.WriteFile(os.path.join(self.py2, 'c.py'), 'C!')
    # Now there are mismatches that we should detect.

  def _AssertMismatches(self, golden_source):
    """Asserts that golden_source and py2 have the expected mismatches."""
    out = StringIO()
    self._ModifyTree()
    self.assertRaisesRegexp(
        SystemExit, '^1$', audit_source_hashes.main,
        ['-g', golden_source, self.py2], out)
    self.assertRegexpMatches(
        out.getvalue(),
        r'In sample .+:\n'
        r'- b\.py: missing from sample\n'
        r'- b2\.py: unexpected file encountered in sample\n'
        r'- c\.py: hash mismatch .+\n\n'
        r'Found 3 mismatched hashes and 0 exceptions\.\n'
        r'FAILED \(0/1 samples passed\)\.\n')

  def testMatches(self):
    """Tests that py matches py2."""
    out = StringIO()
    self.assertRaisesRegexp(
        SystemExit, '^0$', audit_source_hashes.main,
        ['-g', self.py, self.py2], out)
    self.assertEquals('PASSED (1/1 samples passed).\n', out.getvalue())

  def testMismatches(self):
    """Tests that comparing py and py2 yields the expected mismatches."""
    self._AssertMismatches(self.py)

  def testFactoryToolkit(self):
    """Tests with a fake factory toolkit as the golden."""
    # First, create toolkit_contents.tar.xz containing a tar file like
    # the one encoded into a real toolkit.
    factory_dir = os.path.join(self.tmpdir, 'usr', 'local', 'factory')
    os.makedirs(factory_dir)
    os.rename(self.py, os.path.join(factory_dir, 'py'))
    tar_file = os.path.join(self.tmpdir, 'toolkit_contents.tar.xz')
    Spawn(['tar', '-acf', tar_file, '-C', self.tmpdir, './usr'],
          call=True)

    # Build a fake factory toolkit that knows only how to process
    # "install_factory_toolkit.run --tar".  It will be called by
    # audit_source_hashes with args like "--tar -acf -C destdir".
    toolkit_path = os.path.join(self.tmpdir, 'install_factory_toolkit.run')
    file_utils.WriteFile(
        toolkit_path,
        '#!/bin/bash\n'
        '[ "$1" == "--tar" ] || exit 1\n'  # First arg must be '--tar'
        'shift\n'                          # Remove '--tar'
        'tar_flags="$1"\n'                 # Save tar flags
        'shift\n'                          # Remove tar flags
        'tar "$tar_flags" "%s" "$@"\n'     # Call tar on tar_file
        % tar_file)
    os.chmod(toolkit_path, 0555)

    # Now we can use the fake toolkit as a golden source.
    self._AssertMismatches(toolkit_path)

if __name__ == '__main__':
  logging.basicConfig(level=logging.INFO)
  unittest.main()
